package com.cluster.hadoop.api;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.Properties;

/**
 * 提交到集群执行MR
 */
public class CountByMonthMRDriver01 {

    public static class CountByMonthMRDriver01Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            try {
                String s = value.toString();
                String[] row = s.split("\t");
                String[] split = row[1].split("-");
                String createDate = split[0] + "_" + split[1];
                context.write(new Text(createDate), new IntWritable(1));
            } catch (Exception e) {

            }
        }
    }

    public static class CountByMonthMRDriver01Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
//        @Override
//        protected void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
//            int sum = 0;
//            for (IntWritable value : values) {
//                sum += value.get();
//            }
//            context.write(key, new IntWritable(sum));
//        }
    }

    public static void main(String[] args) throws Exception {

        Properties properties = System.getProperties();
        properties.setProperty("HADOOP_USER_NAME", "root");

        Configuration cfg = new Configuration();
        cfg.set("mapreduce.app-submission.cross-platform", "true");

        Job job = Job.getInstance(cfg);
        job.setJar("F:\\IdeaProjects\\bigdata-learn\\out\\artifacts\\hdfs_operation_jar\\hdfs-operation.jar");
        job.setJarByClass(CountByMonthMRDriver01.class);
        job.setMapperClass(CountByMonthMRDriver01Mapper.class);
        job.setReducerClass(CountByMonthMRDriver01Reduce.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //告诉集群输入源---hbase redis,mysql,s3....
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.setInputPaths(job, new Path("hdfs://hadoop1:9000/dxg/simple_data_news"));
        //输出地址----hbase,mysql....
        job.setOutputFormatClass(TextOutputFormat.class);
        FileSystem fs = FileSystem.get(cfg);
        Path path = new Path("hdfs://hadoop1:9000/dxg/out");
        if(fs.exists(path)){
            fs.delete(path,true);
        }
        TextOutputFormat.setOutputPath(job, path);
//        job.setNumReduceTasks(2);

        //任务提交
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
